package com.cloudansys.core.flink.function;

import com.alibaba.fastjson.JSON;
import com.cloudansys.config.DefaultConfig;
import com.cloudansys.core.constant.Const;
import com.cloudansys.core.entity.EvennessEntity;
import com.cloudansys.core.entity.LiftDataEntity;
import com.cloudansys.core.entity.MultiDataEntity;
import com.cloudansys.core.util.MqttUtil;
import com.cloudansys.core.util.NumUtil;
import com.cloudansys.core.util.SerializeUtil;
import com.cloudansys.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.eclipse.paho.client.mqttv3.MqttClient;
import redis.clients.jedis.Jedis;

import java.util.*;

@Slf4j
public class EvennessProcessor extends ProcessWindowFunction<List<MultiDataEntity>, List<MultiDataEntity>, String, TimeWindow> {

    private static Jedis jedis;
    private static MqttClient mqttClient;
    private static String topic_data_pz;
    private static String topic_data_evenness;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        jedis = DefaultConfig.getJedis();
        mqttClient = DefaultConfig.getMqttClient();
        topic_data_pz = DefaultConfig.get(Const.MQTT_TOPIC_DATA_PZ);
        topic_data_evenness = DefaultConfig.get(Const.MQTT_TOPIC_DATA_EVENNESS);
    }

    /**
     * 把平整度数据，通讯编号为36到53的传感器数据缓存到 redis 中，实时更新（一秒更新一次）
     * 首先更新未减复位值的原始数据，一秒更新一个瞬时值
     * 然后更新减去复位值后的数值，一秒更新一次，每次包括这一秒内的所有数据
     *
     *最终修改：平整度，返回的高度拆分成一个高度对应一个平整度
     *高度拆分原理：这次的高度减去上次的高度除以平整度的个数作为拆分后每两个相邻高度的差值，
     *      拆分后的起始高度为上次高度，终止高度为此次高度。
     *
     * 平整度数据推送逻辑：
     * 如果存储的上次高度比当前高度还小50毫米（也就是，提升时，允许高度有回落，但正常情况不应大于某个范围，
     * 这个范围我们暂定50毫米），就把上次高度修改为当前高度。
     */
    @Override
    public void process(String key, Context context, Iterable<List<MultiDataEntity>> elements, Collector<List<MultiDataEntity>> out) throws Exception {
        // 一个传感器的数据放到一个 list 中，key 为 sc
        Map<String, Double> rawValues = new HashMap<>();
        Map<String, TreeMap<String, String>> evennessMap = new HashMap<>();
        Map<String, MultiDataEntity> objMap = new HashMap<>();
        // 把通讯编号为36到53的传感器数据缓存到 redis 中
        for (List<MultiDataEntity> element : elements) {
            // 数据时间
            String pickTime = element.get(0).getPickTime();
            for (MultiDataEntity multiDataEntity : element) {
                String serialCode = multiDataEntity.getSerialCode();
                int sc = Integer.parseInt(serialCode);
                if (sc >= 36 && sc <= 53) {
                    // 获取 redis 中存储的该传感器的复位值
                    double resetValue = getResetValue(serialCode);
                    // 页面展示数据 原始数据减去复位的数值
                    Double value = NumUtil.formatDouble(multiDataEntity.getValues()[1] - resetValue);
                    String wyValue = String.valueOf(value);
                    // 秒级瞬时值
                    if (!rawValues.containsKey(serialCode)) {
                        rawValues.put(serialCode, multiDataEntity.getValues()[1]);
                    }
                    if (evennessMap.containsKey(serialCode)) {
                        evennessMap.get(serialCode).put(pickTime, wyValue);
                    } else {
                        // 这里使用 treeMap，为了让每个传感器这一秒的数据按照时间排序
                        TreeMap<String, String> timeWithWyMap = new TreeMap<>();
                        timeWithWyMap.put(pickTime, wyValue);
                        evennessMap.put(serialCode, timeWithWyMap);
                    }
                }
            }
        }
        // 秒级瞬时原始值存储 redis
        for (String sc : rawValues.keySet()) {
            String redisKey = Const.EVEN_RAW + sc;
            String value = String.valueOf(rawValues.get(sc));
            jedis.set(redisKey, value);
        }

        // 组织每个平整度传感器这一秒内的数据，数据按照时间排序，
        Map<String, List<String>> scWithEvenMap = new HashMap<>();
        for (String sc : evennessMap.keySet()) {
            // 只取出数据，不要时间
            Collection<String> values = evennessMap.get(sc).values();
            scWithEvenMap.put(sc, new ArrayList<>(values));
        }

        // 获取当前提升阶段
        String projectId = jedis.get(Const.PID);
        // 获取当前高度
        String ts = jedis.get(Const.TS_REDIS);
        if (ts == null) {
            return;
        }
        // 获取上次高度
        String last_ts = jedis.get(Const.EVEN_LAST_TS);
        // 第一次的上次高度即初始高度
        if (last_ts == null) {
            last_ts = String.valueOf(Const.TS_BASE);
        }
//        log.info("----------ts: {} last_ts: {}", ts, last_ts);
        // 把结果实时推送到 mqtt
        // 此次高度减去上次高度的差值，即此次提升高度
        double difValue = NumUtil.formatDouble(getDifValue(ts, last_ts) * 1000);
        // 如果存储的上次高度和当前高度的差值的绝对值大于50毫米，就把上次高度修改为当前高度。
        if (Math.abs(difValue) > 50) {
            jedis.set(Const.EVEN_LAST_TS, ts);
        }
        log.info("====【difValue: {}】====", difValue);
        // 这次高度减去上次高度之差小于等于2mm，则这次高度不推送平整度数据（并且只有每次推送数据才更新上次的高度）
        if (difValue > 2 && difValue < 1000) {
//            log.info("----------diffValue: {}", getDifValue(ts, last_ts));
            // 并且只有每次推送数据才更新上次的高度
            jedis.set(Const.EVEN_LAST_TS, ts);
            // 推送平整度数据
            for (String serialCode : scWithEvenMap.keySet()) {
                List<String> valueList = scWithEvenMap.get(serialCode);
                // 数据推送到 mqtt 的主题名称
                String dataTopic = StrUtil.stringsToStr(Const.SLASH, topic_data_pz, projectId, serialCode);
                // 获取 redis 缓存中对应的传感器信息
                LiftDataEntity targetInfo = getTargetInfo(projectId, serialCode);
                if (targetInfo == null) {
                    continue;
                }
                Double[] values = new Double[valueList.size()];
                for (int i = 0; i < valueList.size(); i++) {
                    values[i] = Double.parseDouble(valueList.get(i));
                }
                // 获取当前高度对应的一批高度
                Double[] heights = getBatchHeights(ts, last_ts, values.length);
                // 一个高度对应一个平整度 单独的主题
                String evennessTopic = StrUtil.stringsToStr(Const.SLASH, topic_data_evenness, projectId, serialCode);

                // 平整度传感器数据需要组织成一个高度对应一批平整度数据
                EvennessEntity evennessEntity = EvennessEntity.builder()
                        .projectId(projectId)
                        .targetCode(targetInfo.getTargetCode())
                        .targetCodeAbbr(targetInfo.getTargetCodeAbbr())
                        .typeTag(targetInfo.getTypeTag())
                        .typeName(targetInfo.getTypeName())
                        .typeId(targetInfo.getTypeId())
                        .serialCode(targetInfo.getSerialCode())
                        .site(targetInfo.getSite())
                        .height(ts)
                        .heights(heights)
                        .evenness(values)
                        .build();
                String jsonString = JSON.toJSONString(evennessEntity);
                sendToMqtt(dataTopic, jsonString);
                sendToMqtt(evennessTopic, jsonString);
                log.info("====【evenness: {}】====", jsonString);
            }
        }
    }

    /**
     * @param ts 此次高度
     * @param last_ts 上次高度
     * @param length 平整度的个数
     * @return 此次高度对应的一批高度
     */
    private Double[] getBatchHeights(String ts, String last_ts, int length) {
        // 上次高度
        double h1 = Double.parseDouble(last_ts);
        // 此次高度
        double h2 = Double.parseDouble(ts);
        // 这次的高度减去上次的高度除以平整度的个数作为拆分后每两个相邻高度的差值
        double difValue = getDifValue(ts, last_ts) / length;
        Double[] heights = new Double[length];
        for (int i = 0; i < heights.length; i++) {
            heights[i] = NumUtil.formatDouble(h1 + difValue * (i + 1));
        }
        return heights;
    }

    /**
     * 数据推送
     */
    private void sendToMqtt(String topic, String msg) {
        try {
            // 向 mqtt 推送数据
            MqttUtil.sendToMqtt(mqttClient, topic, msg);
//            log.info("##### send to mqtt success #####");
        } catch (Exception e) {
            log.info("##### send to mqtt failed【data】#####");
            e.printStackTrace();
        }
    }

    /**
     * 获取 redis 缓存中对应的传感器信息
     * 【key -> hk:bi:tgt:projectId_serialCode】，
     */
    private LiftDataEntity getTargetInfo(String projectId, String serialCode) {
        // 获取 redis 缓存中对应的传感器信息
        String tgtKey = "hk:bi:tgt:" + projectId + Const.BAR_BOTTOM + serialCode;
        String s = jedis.get(tgtKey);
        if (s == null) {
            log.error("获取传感器基础信息失败！redisKey 为：{}", tgtKey);
            return null;
        }
        return (LiftDataEntity) SerializeUtil.deserialize(s.getBytes());
    }

    /**
     * @param ts      当前高度（单位 m）
     * @param last_ts 上次推送时的高度（单位 m）
     * @return 当前高度与上次高度的差值（单位 m）
     */
    private double getDifValue(String ts, String last_ts) {
        return Double.parseDouble(ts) - Double.parseDouble(last_ts);
    }

    /**
     * 获取复位值，默认值 0
     */
    private double getResetValue(String serialCode) {
        String redisKey = Const.EVEN_RESET + serialCode;
        String resetValue = jedis.get(redisKey);
        return Double.parseDouble(resetValue);
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (jedis != null) {
            jedis.close();
        }
    }

}
